package cn.ticsmyc.tools.multiThread.masterSlave;

import cn.ticsmyc.tools.multiThread.twoPhaseTermination.AbstractTerminatableThread;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

/**
 * slave通用实现
 *
 * @author Ticsmyc
 * @date 2021-06-04 10:51
 */
public abstract class SlaveWorkerThread<T, V> extends AbstractTerminatableThread implements SlaveSpec<T, V> {

    private final BlockingQueue<Runnable> taskQueue;

    public SlaveWorkerThread(BlockingQueue<Runnable> taskQueue) {
        this.taskQueue = taskQueue;
    }

    @Override
    public void init() {
        this.start();
    }

    @Override
    public void shutdown() {
        this.terminate(true);
    }

    @Override
    public Future<T> submit(V task) throws InterruptedException, SubTaskFailureException {
        FutureTask<T> futureTask = new FutureTask<T>(() -> {
            T result;
            try {
                result = doProcess(task);
            } catch (Exception e) {
                throw getSubTaskFailureException(task, e);
            }
            return result;
        });
        taskQueue.put(futureTask);
        this.terminationToken.reservations.incrementAndGet();
        return futureTask;
    }

    protected SubTaskFailureException getSubTaskFailureException(V task, Exception e) {
        RetryInfo<T, V> tvRetryInfo = new RetryInfo<T, V>(task, () -> doProcess(task));
        return new SubTaskFailureException(tvRetryInfo, e);
    }

    abstract protected T doProcess(V task);

    @Override
    protected void doRun() throws InterruptedException {
        try {
            Runnable take = taskQueue.take();
            take.run();
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " 停止");
        } finally {
            terminationToken.reservations.decrementAndGet();
        }
    }
}
